Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEAT] Enable buffered iteration on plans #2566

Merged
merged 6 commits into from
Jul 30, 2024
Merged

Conversation

jaychia
Copy link
Contributor

@jaychia jaychia commented Jul 26, 2024

Helps close part of #2561

This PR enables buffering of result partition tasks, preventing "runaway execution" of executions when run concurrently.

The problem previously was that if we ran two executions in parallel (e1 and e2) on a machine with 8 CPUs:

  1. e1 could potentially run 8 tasks and keep them buffered (not releasing the resource request)
  2. When e2 attempts to run the next task, it notices that the task cannot be admitted on the system (due to memory constraints)
    • e2 thinks that it is deadlocking because there is a strong assumption in the pyrunner today that if a task cannot be admitted, it merely has to wait for some other tasks in the same execution to finish up.
    • However, e2 doesn't have any tasks currently pending (because it is starved). The pending tasks are all buffered in e1. Thus it thinks that it is deadlocking.

Solution

  • This PR sets the default buffering behavior to 1 instead of allowing each execution to run as many tasks as it wants
  • We introduce logic in the physical plan to have an upper limit on the size of the materialization buffer. If that buffer gets too large, it will start yielding None to indicate that the plan is unable to proceed.

Note that there is still potentially a problem here, e.g. running > NUM_CPU number of executions concurrently. That can be solved in a follow-up PR for refactoring the way we do resource accounting.

@github-actions github-actions bot added the enhancement New feature or request label Jul 26, 2024
Copy link

codecov bot commented Jul 29, 2024

Codecov Report

Attention: Patch coverage is 91.35802% with 7 lines in your changes missing coverage. Please review.

Please upload report for BASE (main@4701290). Learn more about missing BASE report.
Report is 3 commits behind head on main.

Additional details and impacted files

Impacted file tree graph

@@           Coverage Diff           @@
##             main    #2566   +/-   ##
=======================================
  Coverage        ?   64.02%           
=======================================
  Files           ?      951           
  Lines           ?   107920           
  Branches        ?        0           
=======================================
  Hits            ?    69101           
  Misses          ?    38819           
  Partials        ?        0           
Files Coverage Δ
daft/plan_scheduler/physical_plan_scheduler.py 90.90% <100.00%> (ø)
daft/runners/ray_runner.py 90.26% <100.00%> (ø)
daft/dataframe/dataframe.py 88.29% <80.00%> (ø)
daft/runners/pyrunner.py 93.57% <83.33%> (ø)
daft/execution/physical_plan.py 94.59% <91.93%> (ø)

Copy link
Contributor

@desmondcheongzx desmondcheongzx left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good! Some minor nits that would be good to fix if the docstrings are public facing

daft/dataframe/dataframe.py Outdated Show resolved Hide resolved
daft/dataframe/dataframe.py Outdated Show resolved Hide resolved
jaychia and others added 2 commits July 29, 2024 20:06
@jaychia jaychia enabled auto-merge (squash) July 30, 2024 03:08
@jaychia jaychia merged commit 4fec71c into main Jul 30, 2024
44 checks passed
@jaychia jaychia deleted the jay/fix-concurrent-iters branch July 30, 2024 03:27
jaychia added a commit that referenced this pull request Jul 30, 2024
Together with #2566 , closes #2561 

This PR changes the way the PyRunner performs resource accounting.
Instead of updating the number of CPUs, GPUs and memory used only when
futures are retrieved, we do this just before each task completes. These
variables are protected with a lock to allow for concurrent access from
across worker threads.

Additionally, this PR now tracks the inflight `Futures` across all
executions globally in the PyRunner singleton. This is because there
will be instances where a single execution might not be able to make
forward progress (e.g. there are only 8 CPUs available, and there are 8
other currently-executing partitions). In this case, we need to wait for
**some** execution globally to complete before attempting to make
forward progress on the current execution.

---------

Co-authored-by: Jay Chia <[email protected]@users.noreply.github.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants